/*
 * Copyright (c) 2011-2018, Meituan Dianping. All Rights Reserved.
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.dianping.zebra.shard.jdbc;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;

import com.dianping.zebra.filter.DefaultJdbcFilterChain;
import com.dianping.zebra.filter.JdbcFilter;
import com.dianping.zebra.group.router.LocalContextReadWriteStrategy;
import com.dianping.zebra.log.Logger;
import com.dianping.zebra.log.LoggerFactory;
import com.dianping.zebra.shard.exception.ShardBatchInsertException;
import com.dianping.zebra.shard.jdbc.parallel.*;
import com.dianping.zebra.shard.jdbc.unsupport.UnsupportedShardStatement;
import com.dianping.zebra.shard.merge.MergeContext;
import com.dianping.zebra.shard.merge.RowData;
import com.dianping.zebra.shard.merge.ShardResultSetMerger;
import com.dianping.zebra.shard.merge.orderby.OrderByDataMerger;
import com.dianping.zebra.shard.parser.*;
import com.dianping.zebra.shard.router.RouterResult;
import com.dianping.zebra.shard.router.RouterResult.RouterTarget;
import com.dianping.zebra.shard.router.ShardRouter;
import com.dianping.zebra.util.JDBCUtils;
import com.dianping.zebra.util.SqlType;
import com.dianping.zebra.util.SqlUtils;

/**
 * @author Leo Liang
 * @author hao.zhu
 * 
 */
public class ShardStatement extends UnsupportedShardStatement implements Statement {

	protected static final Logger LOGGER = LoggerFactory.getLogger(ShardStatement.class);

	private ShardRouter router;

	protected ShardConnection connection;

	private boolean closed;

	private boolean readOnly;

	protected boolean autoCommit = true;

	private int resultSetType = -1;

	private int resultSetConcurrency = -1;

	private int resultSetHoldability = -1;

	private static final String SELECT_GENERATEDKEY_SQL_PATTERN = "@@identity";

	private static final String SELECT_LAST_INSERT_ID = "last_insert_id()";

	protected Set<ResultSet> attachedResultSets = new HashSet<ResultSet>();

	protected List<Statement> actualStatements = new ArrayList<Statement>();

	protected ResultSet results;

	protected int updateCount = -1;

	protected ResultSet generatedKey;

	protected final List<JdbcFilter> filters;

	private int concurrencyLevel = 1; // 单库并发度

	public ShardStatement(List<JdbcFilter> filters) {
		this.filters = filters;
	}

	private boolean executeInternal(String sql, int autoGeneratedKeys, int[] columnIndexes, String[] columnNames)
	      throws SQLException {
		SqlType sqlType = getSqlType(sql);
		if (sqlType == SqlType.SELECT || sqlType == SqlType.SELECT_FOR_UPDATE) {
			executeQuery(sql);
			return true;
		} else if (sqlType == SqlType.INSERT || sqlType == SqlType.UPDATE || sqlType == SqlType.DELETE
		      || sqlType == SqlType.REPLACE) { // add for replace
			if (autoGeneratedKeys == -1 && columnIndexes == null && columnNames == null) {
				executeUpdate(sql);
			} else if (autoGeneratedKeys != -1) {
				executeUpdate(sql, autoGeneratedKeys);
			} else if (columnIndexes != null) {
				executeUpdate(sql, columnIndexes);
			} else if (columnNames != null) {
				executeUpdate(sql, columnNames);
			} else {
				executeUpdate(sql);
			}

			return false;
		} else {
			throw new SQLException("only select, insert, update, delete, replace sql is supported");
		}
	}

	private int executeUpdateInternal(String sql, int autoGeneratedKeys, int[] columnIndexes, String[] columnNames)
	      throws SQLException {
		checkClosed();

		if (filters != null && filters.size() > 0) {
			JdbcFilter chain = new DefaultJdbcFilterChain(filters) {
				@Override
				public int executeShardUpdate(ShardStatement source, String sql, int autoGeneratedKeys,
				      int[] columnIndexes, String[] columnNames, JdbcFilter chain) throws SQLException {
					if (index < filters.size()) {
						return filters.get(index++).executeShardUpdate(source, sql, autoGeneratedKeys, columnIndexes,
						      columnNames, chain);
					} else {
						return source.executeUpdateWithFilter(sql, autoGeneratedKeys, columnIndexes, columnNames);
					}
				}
			};
			return chain.executeShardUpdate(this, sql, autoGeneratedKeys, columnIndexes, columnNames, chain);
		} else {
			return executeUpdateWithFilter(sql, autoGeneratedKeys, columnIndexes, columnNames);
		}
	}

	private int executeUpdateWithFilter(String sql, int autoGeneratedKeys, int[] columnIndexes, String[] columnNames)
	      throws SQLException {
		RouterResult routerTarget = routingAndCheck(sql, null);

		int affectedRows = 0;
		if (isSingleTarget(routerTarget)) {
			// if has only one sql,then serial execute it
			for (RouterTarget targetedSql : routerTarget.getSqls()) {
				for (String executableSql : targetedSql.getSqls()) {
					Connection conn = connection.getRealConnection(targetedSql.getDatabaseName(), autoCommit);
					Statement stmt = createStatement(conn);
					actualStatements.add(stmt);

					if (autoGeneratedKeys == -1 && columnIndexes == null && columnNames == null) {
						affectedRows += stmt.executeUpdate(executableSql, Statement.RETURN_GENERATED_KEYS);
					} else if (autoGeneratedKeys != -1) {
						affectedRows += stmt.executeUpdate(executableSql, autoGeneratedKeys);
					} else if (columnIndexes != null) {
						affectedRows += stmt.executeUpdate(executableSql, columnIndexes);
					} else if (columnNames != null) {
						affectedRows += stmt.executeUpdate(executableSql, columnNames);
					} else {
						affectedRows += stmt.executeUpdate(executableSql, Statement.RETURN_GENERATED_KEYS);
					}

					SqlType sqlType = SqlUtils.getSqlType(sql);
					if ((SqlType.INSERT == sqlType || SqlType.REPLACE == sqlType)
					      && (autoGeneratedKeys != -1 || columnNames != null || columnIndexes != null)) {
						this.generatedKey = stmt.getGeneratedKeys();
					}
				}
			}
		} else {
			// if has multiple sqls, then parallel execute them
			List<Callable<UpdateResult>> tasks = new ArrayList<Callable<UpdateResult>>();
			List<TaskExecuteResult> taskExecuteResults = new ArrayList<TaskExecuteResult>();
			int concurrencyLevel = this.concurrencyLevel;
			boolean isBatchInsert = routerTarget.isBatchInsert();
			if (routerTarget.getConcurrencyLevel() > 0) {
				concurrencyLevel = routerTarget.getConcurrencyLevel();
			}

			for (RouterTarget targetedSql : routerTarget.getSqls()) {
				if (concurrencyLevel <= 1 || !autoCommit) {
					Map<String, Statement> stmtSqlMap = new HashMap<String, Statement>();
					for (String executableSql : targetedSql.getSqls()) {
						Connection conn = connection.getRealConnection(targetedSql.getDatabaseName(), autoCommit);
						Statement stmt = createStatement(conn);
						actualStatements.add(stmt);
						stmtSqlMap.put(executableSql, stmt);
					}

					StatementExecuteUpdateCallable task = new StatementExecuteUpdateCallable(stmtSqlMap, autoGeneratedKeys,
					      columnIndexes, columnNames);
					if (isBatchInsert) {
						TaskExecuteResult ter = new TaskExecuteResult(targetedSql.getDatabaseName(),
						      targetedSql.getPhysicalTables(), targetedSql.getSqls());
						task.setTaskExecuteResult(ter);
						taskExecuteResults.add(ter);
					}
					tasks.add(task);
				} else {
					int index = 0;
					List<Map<String, Statement>> stmtSqlMapList = new ArrayList<Map<String, Statement>>(concurrencyLevel);
					Connection[] connections = new Connection[concurrencyLevel];
					connection.resetConcurrentConnectionIndexes();

					TaskExecuteResult[] terArray = new TaskExecuteResult[concurrencyLevel];
					List<String> physicalTables = targetedSql.getPhysicalTables();

					for (String executableSql : targetedSql.getSqls()) {
						int groupIndex = index++ % concurrencyLevel;
						Connection conn = connections[groupIndex];
						if (conn == null) {
							conn = connection.getRealConcurrentConnection(targetedSql.getDatabaseName(), autoCommit);
							connections[groupIndex] = conn;
						}
						Statement stmt = createStatement(conn);
						actualStatements.add(stmt);
						Map<String, Statement> stmtSqlMap = null;
						if (groupIndex < stmtSqlMapList.size()) {
							stmtSqlMap = stmtSqlMapList.get(groupIndex);
						} else {
							stmtSqlMap = new LinkedHashMap<String, Statement>();
							stmtSqlMapList.add(stmtSqlMap);
						}
						stmtSqlMap.put(executableSql, stmt);

						// record physical table and sql
						if (isBatchInsert) {
							TaskExecuteResult ter = terArray[groupIndex];
							if (ter == null) {
								ter = new TaskExecuteResult(targetedSql.getDatabaseName());
								terArray[groupIndex] = ter;
							}
							if (physicalTables != null && index - 1 < physicalTables.size()) {
								ter.addTableAndSql(physicalTables.get(index - 1), executableSql);
							}
						}
					}

					for (int i = 0; i < stmtSqlMapList.size(); ++i) {
						Map<String, Statement> stmtSqlMap = stmtSqlMapList.get(i);
						if (stmtSqlMap != null && !stmtSqlMap.isEmpty()) {
							StatementExecuteUpdateCallable task = new StatementExecuteUpdateCallable(stmtSqlMap,
							      autoGeneratedKeys, columnIndexes, columnNames);
							if (isBatchInsert) {
								task.setTaskExecuteResult(terArray[i]);
								taskExecuteResults.add(terArray[i]);
							}
							tasks.add(task);
						}
					}
				}
			}

			List<Future<UpdateResult>> futures = null;
			try {
				futures = SQLThreadPoolExecutor.getInstance(false).invokeSQLs(tasks);
			} catch (SQLException e) {
				throw (isBatchInsert ? new SQLException(new ShardBatchInsertException(e.getMessage(), e.getCause(),
				      taskExecuteResults)) : e);
			}
			for (Future<UpdateResult> f : futures) {
				try {
					UpdateResult updateResult = f.get();
					affectedRows += updateResult.getAffectedRows();
				} catch (Exception e) {
					// normally can't be here
					throw new SQLException(isBatchInsert ? new ShardBatchInsertException(e, taskExecuteResults) : e);
				}
			}
		}

		this.results = null;
		this.updateCount = affectedRows;

		return affectedRows;
	}

	protected ResultSet beforeQuery(String sql) throws SQLException {
		// 特殊处理 SELECT @@IDENTITY AS A
		// 这种SQL，因为这种SQL需要从同一个DPConnection会话中获得上次Insert语句的返回值
		ResultSet generatedKey = this.generatedKey;
		sql = sql.toLowerCase();

		if (generatedKey != null && sql != null
		      && (sql.indexOf(SELECT_GENERATEDKEY_SQL_PATTERN) >= 0 || sql.indexOf(SELECT_LAST_INSERT_ID) >= 0)) {
			List<ResultSet> rsList = new ArrayList<ResultSet>();
			generatedKey.beforeFirst();
			rsList.add(generatedKey);

			ShardResultSet rs = new ShardResultSet();
			rs.setStatement(this);
			rs.setResultSets(rsList);

			attachedResultSets.add(rs);

			this.results = rs;
			this.updateCount = -1;

			return this.results;
		}

		return null;
	}

	protected void checkClosed() throws SQLException {
		if (closed) {
			throw new SQLException("No operations allowed after statement closed.");
		}
	}

	@Override
	public void close() throws SQLException {
		if (closed) {
			return;
		}

		List<SQLException> exceptions = new ArrayList<SQLException>();

		try {
			for (ResultSet resultSet : attachedResultSets) {
				try {
					resultSet.close();
				} catch (SQLException e) {
					exceptions.add(e);
				}
			}

			for (Statement stmt : actualStatements) {
				try {
					stmt.close();
				} catch (SQLException e) {
					exceptions.add(e);
				}
			}
		} finally {
			closed = true;
			attachedResultSets.clear();
			actualStatements.clear();
			results = null;
		}

		JDBCUtils.throwSQLExceptionIfNeeded(exceptions);
	}

	private Statement createStatement(Connection connection) throws SQLException {
		Statement stmt;
		if (this.resultSetType != -1 && this.resultSetConcurrency != -1 && this.resultSetHoldability != -1) {
			stmt = connection.createStatement(this.resultSetType, this.resultSetConcurrency, this.resultSetHoldability);
		} else if (this.resultSetType != -1 && this.resultSetConcurrency != -1) {
			stmt = connection.createStatement(this.resultSetType, this.resultSetConcurrency);
		} else {
			stmt = connection.createStatement();
		}

		return stmt;
	}

	protected void executableCheck(RouterResult routerTarget) throws SQLException {
		if (routerTarget == null) {
			throw new SQLException("No router return value.");
		}
		// TODO 可以增加更多限制
	}

	@Override
	public boolean execute(String sql) throws SQLException {
		return executeInternal(sql, -1, null, null);
	}

	@Override
	public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
		return executeInternal(sql, autoGeneratedKeys, null, null);
	}

	@Override
	public boolean execute(String sql, int[] columnIndexes) throws SQLException {
		return executeInternal(sql, -1, columnIndexes, null);
	}

	@Override
	public boolean execute(String sql, String[] columnNames) throws SQLException {
		return executeInternal(sql, -1, null, columnNames);
	}

	@Override
	public ResultSet executeQuery(String sql) throws SQLException {
		checkClosed();

		if (filters != null && filters.size() > 0) {
			JdbcFilter chain = new DefaultJdbcFilterChain(filters) {
				@Override
				public ResultSet executeShardQuery(ShardStatement source, String sql, JdbcFilter chain) throws SQLException {
					if (index < filters.size()) {
						return filters.get(index++).executeShardQuery(source, sql, chain);
					} else {
						return source.executeQueryWithFilter(sql);
					}
				}
			};
			return chain.executeShardQuery(this, sql, chain);
		} else {
			return executeQueryWithFilter(sql);
		}
	}

	private ResultSet executeQueryWithFilter(String sql) throws SQLException {
		ResultSet specRS = beforeQuery(sql);
		if (specRS != null) {
			this.results = specRS;
			this.updateCount = -1;
			attachedResultSets.add(specRS);
			return this.results;
		}

		RouterResult routerTarget = routingAndCheck(sql, null);
		ShardResultSet rs = new ShardResultSet();
		rs.setStatement(this);
		rs.setRouterTarget(routerTarget);
		attachedResultSets.add(rs);
		this.results = rs;
		this.updateCount = -1;

		MergeContext context = routerTarget.getMergeContext();

		// 有orderby和limit的单个查询用切分成多个的方式进行数据获取
		if (context.isOrderBySplitSql()) {
			return executeOrderyByLimitQuery(rs, sql, routerTarget);
		} else {
			normalSelectExecute(rs, sql, routerTarget);
		}

		// JDBCUtils.throwSQLExceptionIfNeeded(exceptions);

		return this.results;
	}

	private ResultSet normalSelectExecute(ShardResultSet rs, String sql, RouterResult routerTarget) throws SQLException {
		List<ResultSet> originalResults = executeQueryByOriginal(routerTarget);
		for (ResultSet result : originalResults) {
			rs.addResultSet(result);
		}

		if (this.filters != null && this.filters.size() > 0) {
			JdbcFilter chain = new DefaultJdbcFilterChain(this.filters) {
				@Override
				public void shardMerge(ShardResultSet rs, JdbcFilter chain) throws SQLException {
					if (index < filters.size()) {
						filters.get(index++).shardMerge(rs, chain);
					} else {
						rs.init();
					}
				}
			};
			chain.shardMerge(rs, chain);
		} else {
			rs.init();
		}

		return rs;
	}

	private ResultSet executeOrderyByLimitQuery(ShardResultSet srs, String sql, RouterResult routerTarget)
	      throws SQLException {

		if (isSingleTarget(routerTarget)) {
			// 单表查询，直接设置下限为0，并执行原始sql
			routerTarget.getMergeContext().setOffset(MergeContext.NO_OFFSET);
			return normalSelectExecute(srs, sql, routerTarget);
		}

		ShardResultSetMerger shardResultSetMerger = new ShardResultSetMerger();
		OrderByDataMerger orderByDataMerger = new OrderByDataMerger();
		List<List<RowData>> firstResult = new ArrayList<List<RowData>>();
		List<List<RowData>> secondResult = new ArrayList<List<RowData>>();
		List<RowData> startOffsetDataList = new ArrayList<RowData>();
		List<RowData> endOffsetDataList = new ArrayList<RowData>();
		List<Long> countResult = null;
		ArrayList<RouterTarget> emptyFirstResultRouter = new ArrayList<RouterTarget>();
		RowData startData, endData;
		int splitNum = 0;

		SQLParsedResult parseResult = SQLParser.parseWithoutCache(sql);
		for (RouterTarget target : routerTarget.getSqls()) {
			splitNum += target.getSqls().size();
		}

		// 根据路由数拆分offset
		String splitLimitSql = new ShardLimitSqlSplitRewrite().rewrite(parseResult, splitNum, null);
		RouterResult router = routingAndCheck(splitLimitSql, null);
		List<ResultSet> firstResultSets = executeQueryByOriginal(router);

		// 拿到每个路由的返回数据，与路由顺序一一对应，后面需要与二次查询的数据进行对应比较
		// 记下返回值为空的router，后面进行count查询
		int i = 0;
		for (ResultSet rs : firstResultSets) {
			List<RowData> rowDatas = shardResultSetMerger.popResultSet(rs, router.getMergeContext());
			firstResult.add(rowDatas);
			if (rowDatas.size() > 0) {
				startOffsetDataList.add(rowDatas.get(0));
				endOffsetDataList.add(rowDatas.get(rowDatas.size() - 1));
			} else {
				emptyFirstResultRouter.add(router.getTargetByOffset(i));
			}
			i++;
		}

		// 第一次查询数据为空，则结果为空
		boolean isEmpty = true;
		for (List<RowData> data : firstResult) {
			if (data.size() != 0) {
				isEmpty = false;
				break;
			}
		}
		if (isEmpty) {
			srs.setResultSets(Arrays.asList(firstResultSets.get(0)));
			return srs;
		}
		if (!emptyFirstResultRouter.isEmpty()) {
			for (RouterTarget emptyRouter : emptyFirstResultRouter) {
				emptyRouter.getSqls().add(
				      new SqlToCountSqlRewrite().rewrite(((ArrayList<String>) emptyRouter.getSqls()).remove(0), null));
			}
			countResult = new ArrayList<Long>(emptyFirstResultRouter.size());
			RouterResult emptyRouterResult = new RouterResult();
			emptyRouterResult.setSqls(emptyFirstResultRouter);
			List<ResultSet> countResultSets = executeQueryByOriginal(emptyRouterResult);
			for (ResultSet countResultSet : countResultSets) {
				countResultSet.next();
				countResult.add(countResultSet.getLong(SqlToCountSqlRewrite.countAlias));
			}
		}

		// 把分片的每个数据集的首尾数据进行排序然后取出第一个和最后一个数据
		startOffsetDataList = orderByDataMerger.process(startOffsetDataList, router.getMergeContext());
		endOffsetDataList = orderByDataMerger.process(endOffsetDataList, router.getMergeContext());
		startData = startOffsetDataList.get(0);
		endData = endOffsetDataList.get(endOffsetDataList.size() - 1);

		// 用结果作为条件重写SQL并进行二次查询
		String limitSql2 = new ShardLimitSqlWithConditionRewrite().rewrite(sql, startData, endData,
		      router.getMergeContext(), null);
		RouterResult router2 = routingAndCheck(limitSql2, null);
		List<ResultSet> secondResultSets = executeQueryByOriginal(router2);

		for (ResultSet rs : secondResultSets) {
			secondResult.add(shardResultSetMerger.popResultSet(rs, router2.getMergeContext()));
		}

		if (this.filters != null && this.filters.size() > 0) {
			JdbcFilter chain = new DefaultJdbcFilterChain(this.filters) {
				@Override
				public void shardMerge(ShardResultSet rs, JdbcFilter chain) throws SQLException {
					if (index < filters.size()) {
						filters.get(index++).shardMerge(rs, chain);
					}
				}
			};
			chain.shardMerge(srs, chain);
		}

		srs.init(firstResult, secondResult, router.getMergeContext(), parseResult.getMergeContext().getOffset(), router
		      .getMergeContext().getOffset(), splitNum, parseResult.getMergeContext().getLimit(), countResult);

		return srs;
	}

	/*
	 * 获得每个分片原始查询结果的List
	 */
	private List<ResultSet> executeQueryByOriginal(RouterResult routerTarget) throws SQLException {
		ArrayList<ResultSet> resultList = new ArrayList<ResultSet>();

		if (isSingleTarget(routerTarget)) {
			// if has only one sql,then serial execute it
			for (RouterTarget targetedSql : routerTarget.getSqls()) {
				for (String executableSql : targetedSql.getSqls()) {
					if (LOGGER.isDebugEnabled()) {
						LOGGER.debug("db:" + targetedSql.getDatabaseName());
						LOGGER.debug("sql:" + executableSql);
					}

					Connection conn = connection.getRealConnection(targetedSql.getDatabaseName(), autoCommit);
					Statement stmt = createStatement(conn);
					actualStatements.add(stmt);

					resultList.add(stmt.executeQuery(executableSql));
				}
			}
		} else {
			// if has multiple sqls,then parallel execute them
			List<Callable<List<ResultSet>>> tasks = new ArrayList<Callable<List<ResultSet>>>();

			int concurrencyLevel = this.concurrencyLevel;
			if (routerTarget.getConcurrencyLevel() > 0) {
				concurrencyLevel = routerTarget.getConcurrencyLevel();
			}

			for (RouterTarget targetedSql : routerTarget.getSqls()) {
				if (concurrencyLevel <= 1 || !autoCommit) {
					Map<String, Statement> stmtSqlMap = new LinkedHashMap<String, Statement>();
					for (String executableSql : targetedSql.getSqls()) {
						Connection conn = connection.getRealConnection(targetedSql.getDatabaseName(), autoCommit);
						Statement stmt = createStatement(conn);
						actualStatements.add(stmt);
						stmtSqlMap.put(executableSql, stmt);
					}
					tasks.add(new StatementExecuteQueryCallable(stmtSqlMap, LocalContextReadWriteStrategy
					      .getReadFromMaster()));
				} else {
					int count = 0;
					Connection[] connections = new Connection[concurrencyLevel];
					connection.resetConcurrentConnectionIndexes();

					List<Map<String, Statement>> stmtSqlMapList = new ArrayList<Map<String, Statement>>(concurrencyLevel);
					for (String executableSql : targetedSql.getSqls()) {
						int index = count++ % concurrencyLevel;
						Connection conn = connections[index];
						if (conn == null) {
							conn = connection.getRealConcurrentConnection(targetedSql.getDatabaseName(), autoCommit);
							connections[index] = conn;
						}
						Statement stmt = createStatement(conn);
						actualStatements.add(stmt);
						Map<String, Statement> stmtSqlMap = null;
						if (index < stmtSqlMapList.size()) {
							stmtSqlMap = stmtSqlMapList.get(index);
						} else {
							stmtSqlMap = new LinkedHashMap<String, Statement>();
							stmtSqlMapList.add(stmtSqlMap);
						}
						stmtSqlMap.put(executableSql, stmt);
					}
					for (Map<String, Statement> stmtSqlMap : stmtSqlMapList) {
						if (stmtSqlMap != null && !stmtSqlMap.isEmpty()) {
							tasks.add(new StatementExecuteQueryCallable(stmtSqlMap, LocalContextReadWriteStrategy
							      .getReadFromMaster()));
						}
					}
				}
			}

			List<Future<List<ResultSet>>> futures = SQLThreadPoolExecutor.getInstance(true).invokeSQLs(tasks);
			for (Future<List<ResultSet>> f : futures) {
				try {
					resultList.addAll(f.get());
				} catch (Exception e) {
					// normally can't be here!
					throw new SQLException(e);
				}
			}
		}

		return resultList;
	}

	@Override
	public int executeUpdate(String sql) throws SQLException {
		return executeUpdateInternal(sql, -1, null, null);
	}

	@Override
	public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
		return executeUpdateInternal(sql, autoGeneratedKeys, null, null);
	}

	@Override
	public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
		return executeUpdateInternal(sql, -1, columnIndexes, null);
	}

	@Override
	public int executeUpdate(String sql, String[] columnNames) throws SQLException {
		return executeUpdateInternal(sql, -1, null, columnNames);
	}

	public Set<ResultSet> getAttachedResultSets() {
		return attachedResultSets;
	}

	@Override
	public Connection getConnection() throws SQLException {
		return connection;
	}

	@Override
	public ResultSet getGeneratedKeys() throws SQLException {
		return this.generatedKey;
	}

	@Override
	public boolean getMoreResults() throws SQLException {
		return false;
	}

	@Override
	public ResultSet getResultSet() throws SQLException {
		return results;
	}

	@Override
	public int getResultSetConcurrency() throws SQLException {
		return resultSetConcurrency;
	}

	@Override
	public int getResultSetHoldability() throws SQLException {
		return resultSetHoldability;
	}

	@Override
	public int getResultSetType() throws SQLException {
		return resultSetType;
	}

	public ShardRouter getRouter() {
		return router;
	}

	@Override
	public int getUpdateCount() throws SQLException {
		return updateCount;
	}

	public boolean isAutoCommit() {
		return autoCommit;
	}

	@Override
	public boolean isClosed() throws SQLException {
		return closed;
	}

	public boolean isReadOnly() {
		return readOnly;
	}

	protected SqlType getSqlType(String sql) throws SQLException {
		return SqlUtils.getSqlType(sql);
	}

	protected RouterResult routingAndCheck(String sql, final List<Object> params) throws SQLException {
		RouterResult routerTarget = null;

		try {
			routerTarget = router.router(sql, params);
			executableCheck(routerTarget);

			return routerTarget;
		} catch (Exception e) {
			throw new SQLException(e);
		} finally {
			if (routerTarget == null) {
				routerTarget = new RouterResult();
				routerTarget.setParams(params);
			}

			if (this.filters != null && this.filters.size() > 0) {
				JdbcFilter chain = new DefaultJdbcFilterChain(this.filters) {
					@Override
					public void shardRouting(RouterResult rs, JdbcFilter chain) throws SQLException {
						if (index < filters.size()) {
							filters.get(index++).shardRouting(rs, chain);
						}
					}
				};

				chain.shardRouting(routerTarget, chain);
			}
		}
	}

	protected boolean isSingleTarget(RouterResult rr) {
		if (rr.getSqls().size() > 1) {
			return false;
		}
		RouterTarget rt = rr.getSqls().get(0);

		if (rt.getSqls().size() > 1) {
			return false;
		}

		return true;
	}

	public void setAttachedResultSets(Set<ResultSet> attachedResultSets) {
		this.attachedResultSets = attachedResultSets;
	}

	public void setAutoCommit(boolean autoCommit) {
		this.autoCommit = autoCommit;
	}

	public void setConnection(ShardConnection dpConnection) {
		this.connection = dpConnection;
	}

	public void setReadOnly(boolean readOnly) {
		this.readOnly = readOnly;
	}

	public void setResultSetConcurrency(int resultSetConcurrency) {
		this.resultSetConcurrency = resultSetConcurrency;
	}

	public void setResultSetHoldability(int resultSetHoldability) {
		this.resultSetHoldability = resultSetHoldability;
	}

	public void setResultSetType(int resultSetType) {
		this.resultSetType = resultSetType;
	}

	public void setRouter(ShardRouter router) {
		this.router = router;
	}

	public void setConcurrencyLevel(int concurrencyLevel) {
		this.concurrencyLevel = concurrencyLevel;
	}
}
